Skip to content

Conversation

sgrebnov
Copy link
Member

@sgrebnov sgrebnov commented Oct 3, 2025

Which issue does this PR close?

PR fixes partial writes similar to reported here. Despite the following code to enforce CoalescePartitionsExec (single input behavior) it can be removed by DataFusion optimizer. Unit test was added to demonstrate such behavior.

let write_plan = Arc::new(IcebergWriteExec::new(
self.table.clone(),
input,
self.schema.clone(),
));
// Merge the outputs of write_plan into one so we can commit all files together
let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(write_plan));
Ok(Arc::new(IcebergCommitExec::new(
self.table.clone(),
catalog,
coalesce_partitions,
self.schema.clone(),
)))

        let write_plan = Arc::new(IcebergWriteExec::new(
            self.table.clone(),
            input,
            self.schema.clone(),
        ));

        // Merge the outputs of write_plan into one so we can commit all files together
        let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(write_plan));

        Ok(Arc::new(IcebergCommitExec::new(
            self.table.clone(),
            catalog,
            coalesce_partitions,
            self.schema.clone(),
        )))

Example plan (observe no CoalescePartitionsExec)

explain format tree insert into task_history_sink select * from runtime.task_history;
+---------------+-------------------------------+
| plan_type     | plan                          |
+---------------+-------------------------------+
| physical_plan | ┌───────────────────────────┐ |
|               | │     IcebergCommitExec     │ |
|               | │    --------------------   │ |
|               | │ IcebergCommitExec: table: │ |
|               | │   team_app.task_history   │ |
|               | └─────────────┬─────────────┘ |
|               | ┌─────────────┴─────────────┐ |
|               | │      IcebergWriteExec     │ |
|               | │    --------------------   │ |
|               | │  IcebergWriteExec: table: │ |
|               | │   team_app.task_history   │ |
|               | └─────────────┬─────────────┘ |
|               | ┌─────────────┴─────────────┐ |
|               | │       ProjectionExec      │ |
|               | │    --------------------   │ |
|               | │      captured_output:     │ |
|               | │      captured_output      │ |
|               | │                           │ |
|               | │         end_time:         │ |
|               | │ CAST(end_time AS Timestamp│ |
|               | │    (Microsecond, None))   │ |
|               | │                           │ |
|               | │       error_message:      │ |
|               | │       error_message       │ |
|               | │                           │ |
|               | │   execution_duration_ms:  │ |
|               | │   execution_duration_ms   │ |
|               | │                           │ |
|               | │        input: input       │ |
|               | │                           │ |
|               | │          labels:          │ |
|               | │ CAST(labels AS Map(Field {│ |
|               | │     name: "key_value",    │ |
|               | │     data_type: Struct(    │ |
|               | │   [Field { name: "key",   │ |
|               | │      data_type: Utf8,     │ |
|               | │      nullable: false,     │ |
|               | │         dict_id: 0,       │ |
|               | │       dict_is_ordered     │ |
|               |: false, metadata: {   │ |
|               |"PARQUET:field_id":    │ |
|               |"12"} }, Field { name:   │ |
|               |"value", data_type:    │ |
|               | │    Utf8, nullable: true|
|               | │            ...            │ |
|               | └─────────────┬─────────────┘ |
|               | ┌─────────────┴─────────────┐ |
|               | │      RepartitionExec      │ |
|               | │    --------------------   │ |
|               | │ partition_count(in->out): │ |
|               | │          1 -> 14          │ |
|               | │                           │ |
|               | │    partitioning_scheme:   │ |
|               | │    RoundRobinBatch(14)    │ |
|               | └─────────────┬─────────────┘ |
|               | ┌─────────────┴─────────────┐ |
|               | │     BytesProcessedExec    │ |
|               | │    --------------------   │ |
|               | │     BytesProcessedExec    │ |
|               | └─────────────┬─────────────┘ |
|               | ┌─────────────┴─────────────┐ |
|               | │     SchemaCastScanExec    │ |
|               | │    --------------------   │ |
|               | │     SchemaCastScanExec    │ |
|               | └─────────────┬─────────────┘ |
|               | ┌─────────────┴─────────────┐ |
|               | │       DataSourceExec      │ |
|               | │    --------------------   │ |
|               | │        bytes: 88176       │ |
|               | │       format: memory      │ |
|               | │          rows: 6          │ |
|               | └───────────────────────────┘ |
|               |                               |
+---------------+-------------------------------+

What changes are included in this PR?

PR adds required_input_distribution setting for IcebergWriteExec to ensure DataFusion coalesces input partitions automatically before commit. Similar to DataFusion DataSinkExec

test_datafusion_execution_partitioned_source can be used to ovserve behavior before and after

Before

Physical plan: 
IcebergCommitExec: table=test_namespace.test_table_partitioning
  RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=3
    IcebergWriteExec: table=test_namespace.test_table_partitioning
      DataSourceExec: partitions=3, partition_sizes=[1, 1, 1]

After

IcebergCommitExec: table=test_namespace.test_table
  CoalescePartitionsExec
    IcebergWriteExec: table=test_namespace.test_table
      DataSourceExec: partitions=3, partition_sizes=[1, 1, 1]

Are these changes tested?

Added test_datafusion_execution_partitioned_source unit test, tested manually

@sgrebnov sgrebnov changed the title Improve IcebergCommitExec to correctly specify properties schema fix: prevent DataFusion RepartitionExec insertion into IcebergCommitExec Oct 3, 2025
@sgrebnov sgrebnov changed the title fix: prevent DataFusion RepartitionExec insertion into IcebergCommitExec fix: ensure CoalescePartitionsExec is enabled for IcebergCommitExec Oct 3, 2025
@sgrebnov sgrebnov marked this pull request as draft October 3, 2025 19:44
@sgrebnov sgrebnov force-pushed the sgrebnov/1003-upstream-partitioned-source-write branch from 8d38a28 to 2e09238 Compare October 3, 2025 20:52
@sgrebnov sgrebnov marked this pull request as ready for review October 3, 2025 20:53
Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sgrebnov for this fix!

@liurenjie1024 liurenjie1024 merged commit 1de3315 into apache:main Oct 11, 2025
16 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants